1   /*
2    * Copyright (C) 2007 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.eventbus;
18  
19  import static com.google.common.base.Preconditions.checkNotNull;
20  
21  import com.google.common.annotations.Beta;
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.base.Throwables;
24  import com.google.common.cache.CacheBuilder;
25  import com.google.common.cache.CacheLoader;
26  import com.google.common.cache.LoadingCache;
27  import com.google.common.collect.HashMultimap;
28  import com.google.common.collect.Multimap;
29  import com.google.common.collect.SetMultimap;
30  import com.google.common.reflect.TypeToken;
31  import com.google.common.util.concurrent.UncheckedExecutionException;
32  
33  import java.lang.reflect.InvocationTargetException;
34  import java.util.Collection;
35  import java.util.LinkedList;
36  import java.util.Map.Entry;
37  import java.util.Queue;
38  import java.util.Set;
39  import java.util.concurrent.locks.ReadWriteLock;
40  import java.util.concurrent.locks.ReentrantReadWriteLock;
41  import java.util.logging.Level;
42  import java.util.logging.Logger;
43  
44  /**
45   * Dispatches events to listeners, and provides ways for listeners to register
46   * themselves.
47   *
48   * <p>The EventBus allows publish-subscribe-style communication between
49   * components without requiring the components to explicitly register with one
50   * another (and thus be aware of each other).  It is designed exclusively to
51   * replace traditional Java in-process event distribution using explicit
52   * registration. It is <em>not</em> a general-purpose publish-subscribe system,
53   * nor is it intended for interprocess communication.
54   *
55   * <h2>Receiving Events</h2>
56   * <p>To receive events, an object should:
57   * <ol>
58   * <li>Expose a public method, known as the <i>event subscriber</i>, which accepts
59   *     a single argument of the type of event desired;</li>
60   * <li>Mark it with a {@link Subscribe} annotation;</li>
61   * <li>Pass itself to an EventBus instance's {@link #register(Object)} method.
62   *     </li>
63   * </ol>
64   *
65   * <h2>Posting Events</h2>
66   * <p>To post an event, simply provide the event object to the
67   * {@link #post(Object)} method.  The EventBus instance will determine the type
68   * of event and route it to all registered listeners.
69   *
70   * <p>Events are routed based on their type &mdash; an event will be delivered
71   * to any subscriber for any type to which the event is <em>assignable.</em>  This
72   * includes implemented interfaces, all superclasses, and all interfaces
73   * implemented by superclasses.
74   *
75   * <p>When {@code post} is called, all registered subscribers for an event are run
76   * in sequence, so subscribers should be reasonably quick.  If an event may trigger
77   * an extended process (such as a database load), spawn a thread or queue it for
78   * later.  (For a convenient way to do this, use an {@link AsyncEventBus}.)
79   *
80   * <h2>Subscriber Methods</h2>
81   * <p>Event subscriber methods must accept only one argument: the event.
82   *
83   * <p>Subscribers should not, in general, throw.  If they do, the EventBus will
84   * catch and log the exception.  This is rarely the right solution for error
85   * handling and should not be relied upon; it is intended solely to help find
86   * problems during development.
87   *
88   * <p>The EventBus guarantees that it will not call a subscriber method from
89   * multiple threads simultaneously, unless the method explicitly allows it by
90   * bearing the {@link AllowConcurrentEvents} annotation.  If this annotation is
91   * not present, subscriber methods need not worry about being reentrant, unless
92   * also called from outside the EventBus.
93   *
94   * <h2>Dead Events</h2>
95   * <p>If an event is posted, but no registered subscribers can accept it, it is
96   * considered "dead."  To give the system a second chance to handle dead events,
97   * they are wrapped in an instance of {@link DeadEvent} and reposted.
98   *
99   * <p>If a subscriber for a supertype of all events (such as Object) is registered,
100  * no event will ever be considered dead, and no DeadEvents will be generated.
101  * Accordingly, while DeadEvent extends {@link Object}, a subscriber registered to
102  * receive any Object will never receive a DeadEvent.
103  *
104  * <p>This class is safe for concurrent use.
105  * 
106  * <p>See the Guava User Guide article on <a href=
107  * "http://code.google.com/p/guava-libraries/wiki/EventBusExplained">
108  * {@code EventBus}</a>.
109  *
110  * @author Cliff Biffle
111  * @since 10.0
112  */
113 @Beta
114 public class EventBus {
115 
116   /**
117    * A thread-safe cache for flattenHierarchy(). The Class class is immutable. This cache is shared
118    * across all EventBus instances, which greatly improves performance if multiple such instances
119    * are created and objects of the same class are posted on all of them.
120    */
121   private static final LoadingCache<Class<?>, Set<Class<?>>> flattenHierarchyCache =
122       CacheBuilder.newBuilder()
123           .weakKeys()
124           .build(new CacheLoader<Class<?>, Set<Class<?>>>() {
125             @SuppressWarnings({"unchecked", "rawtypes"}) // safe cast
126             @Override
127             public Set<Class<?>> load(Class<?> concreteClass) {
128               return (Set) TypeToken.of(concreteClass).getTypes().rawTypes();
129             }
130           });
131 
132   /**
133    * All registered event subscribers, indexed by event type.
134    *
135    * <p>This SetMultimap is NOT safe for concurrent use; all access should be
136    * made after acquiring a read or write lock via {@link #subscribersByTypeLock}.
137    */
138   private final SetMultimap<Class<?>, EventSubscriber> subscribersByType =
139       HashMultimap.create();
140   private final ReadWriteLock subscribersByTypeLock = new ReentrantReadWriteLock();
141 
142   /**
143    * Strategy for finding subscriber methods in registered objects.  Currently,
144    * only the {@link AnnotatedSubscriberFinder} is supported, but this is
145    * encapsulated for future expansion.
146    */
147   private final SubscriberFindingStrategy finder = new AnnotatedSubscriberFinder();
148 
149   /** queues of events for the current thread to dispatch */
150   private final ThreadLocal<Queue<EventWithSubscriber>> eventsToDispatch =
151       new ThreadLocal<Queue<EventWithSubscriber>>() {
152     @Override protected Queue<EventWithSubscriber> initialValue() {
153       return new LinkedList<EventWithSubscriber>();
154     }
155   };
156 
157   /** true if the current thread is currently dispatching an event */
158   private final ThreadLocal<Boolean> isDispatching =
159       new ThreadLocal<Boolean>() {
160     @Override protected Boolean initialValue() {
161       return false;
162     }
163   };
164 
165   private SubscriberExceptionHandler subscriberExceptionHandler;
166 
167   /**
168    * Creates a new EventBus named "default".
169    */
170   public EventBus() {
171     this("default");
172   }
173 
174   /**
175    * Creates a new EventBus with the given {@code identifier}.
176    *
177    * @param identifier  a brief name for this bus, for logging purposes.  Should
178    *                    be a valid Java identifier.
179    */
180   public EventBus(String identifier) {
181     this(new LoggingSubscriberExceptionHandler(identifier));
182   }
183 
184   /**
185    * Creates a new EventBus with the given {@link SubscriberExceptionHandler}.
186    * 
187    * @param subscriberExceptionHandler Handler for subscriber exceptions.
188    * @since 16.0
189    */
190   public EventBus(SubscriberExceptionHandler subscriberExceptionHandler) {
191     this.subscriberExceptionHandler = checkNotNull(subscriberExceptionHandler);
192   }
193 
194   /**
195    * Registers all subscriber methods on {@code object} to receive events.
196    * Subscriber methods are selected and classified using this EventBus's
197    * {@link SubscriberFindingStrategy}; the default strategy is the
198    * {@link AnnotatedSubscriberFinder}.
199    *
200    * @param object  object whose subscriber methods should be registered.
201    */
202   public void register(Object object) {
203     Multimap<Class<?>, EventSubscriber> methodsInListener =
204         finder.findAllSubscribers(object);
205     subscribersByTypeLock.writeLock().lock();
206     try {
207       subscribersByType.putAll(methodsInListener);
208     } finally {
209       subscribersByTypeLock.writeLock().unlock();
210     }
211   }
212 
213   /**
214    * Unregisters all subscriber methods on a registered {@code object}.
215    *
216    * @param object  object whose subscriber methods should be unregistered.
217    * @throws IllegalArgumentException if the object was not previously registered.
218    */
219   public void unregister(Object object) {
220     Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object);
221     for (Entry<Class<?>, Collection<EventSubscriber>> entry :
222           methodsInListener.asMap().entrySet()) {
223       Class<?> eventType = entry.getKey();
224       Collection<EventSubscriber> eventMethodsInListener = entry.getValue();
225 
226       subscribersByTypeLock.writeLock().lock();
227       try {
228         Set<EventSubscriber> currentSubscribers = subscribersByType.get(eventType);
229         if (!currentSubscribers.containsAll(eventMethodsInListener)) {
230           throw new IllegalArgumentException(
231               "missing event subscriber for an annotated method. Is " + object + " registered?");
232         }
233         currentSubscribers.removeAll(eventMethodsInListener);
234       } finally {
235         subscribersByTypeLock.writeLock().unlock();
236       }
237     }
238   }
239 
240   /**
241    * Posts an event to all registered subscribers.  This method will return
242    * successfully after the event has been posted to all subscribers, and
243    * regardless of any exceptions thrown by subscribers.
244    *
245    * <p>If no subscribers have been subscribed for {@code event}'s class, and
246    * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
247    * DeadEvent and reposted.
248    *
249    * @param event  event to post.
250    */
251   public void post(Object event) {
252     Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());
253 
254     boolean dispatched = false;
255     for (Class<?> eventType : dispatchTypes) {
256       subscribersByTypeLock.readLock().lock();
257       try {
258         Set<EventSubscriber> wrappers = subscribersByType.get(eventType);
259 
260         if (!wrappers.isEmpty()) {
261           dispatched = true;
262           for (EventSubscriber wrapper : wrappers) {
263             enqueueEvent(event, wrapper);
264           }
265         }
266       } finally {
267         subscribersByTypeLock.readLock().unlock();
268       }
269     }
270 
271     if (!dispatched && !(event instanceof DeadEvent)) {
272       post(new DeadEvent(this, event));
273     }
274 
275     dispatchQueuedEvents();
276   }
277 
278   /**
279    * Queue the {@code event} for dispatch during
280    * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence
281    * so they can be dispatched in the same order.
282    */
283   void enqueueEvent(Object event, EventSubscriber subscriber) {
284     eventsToDispatch.get().offer(new EventWithSubscriber(event, subscriber));
285   }
286 
287   /**
288    * Drain the queue of events to be dispatched. As the queue is being drained,
289    * new events may be posted to the end of the queue.
290    */
291   void dispatchQueuedEvents() {
292     // don't dispatch if we're already dispatching, that would allow reentrancy
293     // and out-of-order events. Instead, leave the events to be dispatched
294     // after the in-progress dispatch is complete.
295     if (isDispatching.get()) {
296       return;
297     }
298 
299     isDispatching.set(true);
300     try {
301       Queue<EventWithSubscriber> events = eventsToDispatch.get();
302       EventWithSubscriber eventWithSubscriber;
303       while ((eventWithSubscriber = events.poll()) != null) {
304         dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
305       }
306     } finally {
307       isDispatching.remove();
308       eventsToDispatch.remove();
309     }
310   }
311 
312   /**
313    * Dispatches {@code event} to the subscriber in {@code wrapper}.  This method
314    * is an appropriate override point for subclasses that wish to make
315    * event delivery asynchronous.
316    *
317    * @param event  event to dispatch.
318    * @param wrapper  wrapper that will call the subscriber.
319    */
320   void dispatch(Object event, EventSubscriber wrapper) {
321     try {
322       wrapper.handleEvent(event);
323     } catch (InvocationTargetException e) {
324       try {
325         subscriberExceptionHandler.handleException(
326             e.getCause(),
327             new SubscriberExceptionContext(
328                 this,
329                 event,
330                 wrapper.getSubscriber(),
331                 wrapper.getMethod()));
332       } catch (Throwable t) {
333         // If the exception handler throws, log it. There isn't much else to do!
334         Logger.getLogger(EventBus.class.getName()).log(Level.SEVERE,
335              String.format(
336             "Exception %s thrown while handling exception: %s", t,
337             e.getCause()),
338             t);
339       }
340     }
341   }
342 
343   /**
344    * Flattens a class's type hierarchy into a set of Class objects.  The set
345    * will include all superclasses (transitively), and all interfaces
346    * implemented by these superclasses.
347    *
348    * @param concreteClass  class whose type hierarchy will be retrieved.
349    * @return {@code clazz}'s complete type hierarchy, flattened and uniqued.
350    */
351   @VisibleForTesting
352   Set<Class<?>> flattenHierarchy(Class<?> concreteClass) {
353     try {
354       return flattenHierarchyCache.getUnchecked(concreteClass);
355     } catch (UncheckedExecutionException e) {
356       throw Throwables.propagate(e.getCause());
357     }
358   }
359 
360   /**
361    * Simple logging handler for subscriber exceptions.
362    */
363   private static final class LoggingSubscriberExceptionHandler
364       implements SubscriberExceptionHandler {
365 
366     /**
367      * Logger for event dispatch failures.  Named by the fully-qualified name of
368      * this class, followed by the identifier provided at construction.
369      */
370     private final Logger logger;
371 
372     /**
373      * @param identifier a brief name for this bus, for logging purposes. Should
374      *        be a valid Java identifier.
375      */
376     public LoggingSubscriberExceptionHandler(String identifier) {
377       logger = Logger.getLogger(
378           EventBus.class.getName() + "." + checkNotNull(identifier));
379     }
380 
381     @Override
382     public void handleException(Throwable exception,
383         SubscriberExceptionContext context) {
384       logger.log(Level.SEVERE, "Could not dispatch event: " 
385           + context.getSubscriber() + " to " + context.getSubscriberMethod(),
386           exception.getCause());
387     }
388   }
389 
390   /** simple struct representing an event and it's subscriber */
391   static class EventWithSubscriber {
392     final Object event;
393     final EventSubscriber subscriber;
394     public EventWithSubscriber(Object event, EventSubscriber subscriber) {
395       this.event = checkNotNull(event);
396       this.subscriber = checkNotNull(subscriber);
397     }
398   }
399 }